package com.heima.schedule.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.heima.common.constants.ScheduleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.schedule.dtos.TaskDto;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskinfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;

/**
 * @author 陈辉
 * @data 2023 16:38
 */
@Service
@Slf4j
@Transactional
public class TaskinfoServiceImpl extends ServiceImpl<TaskinfoMapper, Taskinfo> implements TaskinfoService {

    @Autowired
    private TaskinfoLogsMapper taskinfoLogsMapper;
    @Autowired
    private CacheService cacheService;

    /**
     * 添加任务
     *
     * @param taskDto
     * @return
     */
    @Override
    public ResponseResult addTask(TaskDto taskDto) {
        //1. 将任务存入数据库
        addTaskToDB(taskDto);
        //2. 将任务存入缓存redis
        addTaskToRedis(taskDto);
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }

    //将任务保存到redis缓存
    private void addTaskToRedis(TaskDto taskDto) {
        long taskExecuteTime = taskDto.getExecuteTime();
        long now = System.currentTimeMillis();
        long zSetTime = now + (1000 * 60 * 5);
        String listKey = ScheduleConstants.TOPIC + taskDto.getTaskType() + "_" + taskDto.getPriority();
        String zSetKey = ScheduleConstants.FUTURE + taskDto.getTaskType() + "_" + taskDto.getPriority();

        //1. List: 任务执行时间   <= 当前系统时间
        if (taskExecuteTime <= now) {
            cacheService.lLeftPush(listKey, JSON.toJSONString(taskDto));
        } else if (taskExecuteTime > now && taskExecuteTime <= zSetTime) {
            //2. ZSet: 当前系统时间+5分钟 >= 任务执行时间   > 当前系统时间
            cacheService.zAdd(zSetKey, JSON.toJSONString(taskDto), taskDto.getExecuteTime());
        }
    }

    //将任务保存到数据库
    private void addTaskToDB(TaskDto taskDto) {
        //1. 将任务信息保存到task_info表中
        Taskinfo taskinfo = new Taskinfo();
        BeanUtils.copyProperties(taskDto, taskinfo);
        taskinfo.setExecuteTime(new Date(taskDto.getExecuteTime()));
        this.save(taskinfo);

        //将taskId回填到taskDto中
        taskDto.setTaskId(taskinfo.getTaskId());

        //2. 往task_info_logs表中记录任务日志信息
        TaskinfoLogs logs = new TaskinfoLogs();
        BeanUtils.copyProperties(taskinfo, logs);
        logs.setVersion(1);     //乐观锁
        logs.setStatus(ScheduleConstants.SCHEDULED);      //初始状态
        taskinfoLogsMapper.insert(logs);
    }

    /**
     * 拉取任务
     *
     * @param taskType
     * @param priority
     * @return
     */
    @Override
    public ResponseResult pullTask(Integer taskType, Integer priority) {
        if (taskType != null && priority != null) {
            //核心：1. 从list中拉取任务数据
            String listKey = ScheduleConstants.TOPIC + taskType + "_" + priority;
            String taskJson = cacheService.lRightPop(listKey);

            //有任务：返回：200，taskDto
            if (StringUtils.isNotBlank(taskJson)) {
                //2. 任务一旦被拉出缓存，同时要将任务信息从taskinfo表中移除
                TaskDto taskDto = JSON.parseObject(taskJson, TaskDto.class);
                this.removeById(taskDto.getTaskId());
                //3. 任务一旦被拉出缓存，同时要将任务日志的状态变更为：EXECUTED=1; //已执行状态
                TaskinfoLogs logs = new TaskinfoLogs();
                logs.setTaskId(taskDto.getTaskId());
                logs.setStatus(ScheduleConstants.EXECUTED);
                taskinfoLogsMapper.updateById(logs);
                return ResponseResult.okResult(taskJson);
            }else{
                //没有任务：返回：1002，数据不存在
                return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST);
            }
        } else {
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }
    }

    /**
     * 定时刷新ZSet数据到List
     */
    @Scheduled(cron = "0 0/1 * * * ? ")
    public void refreshZSetToList(){
        log.info("定时刷新ZSet数据到List");
        //1. 先从redis中查找 future_作为前缀的key
        Set<String> zSetKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
        //2. 遍历ZSet
        if (zSetKeys != null && zSetKeys.size() >0){
            for (String zSetKey : zSetKeys) {       //futureKey: future_1001_1
                String[] split = zSetKey.split(ScheduleConstants.FUTURE);     //{"","1001_1"}
                //2.1 拼接得到listKey
                String listKey = ScheduleConstants.TOPIC+split[1];
                Set<String> tasks = cacheService.zRangeByScore(zSetKey, 0, System.currentTimeMillis());
                //批量将ZSet集合中的任务转移到list集合中
                if (tasks != null && tasks.size() > 0) {
                    cacheService.refreshWithPipeline(zSetKey, listKey, tasks);
                }
            }
        }

    }

    /**
     * 定时刷新数据库数据到缓存
     */
    @Scheduled(cron = "0 0/5 * * * ? ")
    public void refreshDBToRedis(){
        log.info("定时刷新数据库到redis");
        //查询数据库中任务： 执行时间 <= 当前系统时间5分钟
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE,5);

        LambdaQueryWrapper<Taskinfo> wrapper = Wrappers.<Taskinfo>lambdaQuery()
                .le(Taskinfo::getExecuteTime, calendar.getTime());

        List<Taskinfo> taskinfos = this.list(wrapper);
        if (taskinfos != null && taskinfos.size() > 0){
            for (Taskinfo taskinfo : taskinfos) {
                TaskDto taskDto = new TaskDto();
                BeanUtils.copyProperties(taskinfo,taskDto);


                //先清空redis中任务：防止任务被重复消费
                Set<String> zSetKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
                Set<String> listKeys = cacheService.scan(ScheduleConstants.TOPIC + "*");

                cacheService.delete(zSetKeys);
                cacheService.delete(listKeys);

                //将任务同步到redis
                addTaskToRedis(taskDto);
            }

        }

    }

}
